Step FunctionsでPinpointのセグメント作成とキャンペーン配信を自動化してみた
こんにちは。たかやまです。
今回はS3にPinpointのセグメント情報が配置されたら、自動でインポートからキャンペーンを配信するStep Functionsを作っていこうと思います。
今回の検証環境のサンプルコードはこちらです。(Pinpointのプロジェクトは含まず)
構成図
今回の構成は以下の通りです。
やってみた
Lambda
Lambdaはセグメントインポート用とキャンペーン作成用の2つ作成します。
セグメントインポート用ではS3のEventBridge通知からS3ファイルパス抽出と日付プレフィックスを付与したセグメントを作成しています。
EventBridge通知例
{ "version": "0", "id": "05902619-81be-71bb-4029-f9629997a5ef", "detail-type": "Object Created", "source": "aws.s3", "account": "YOUR_ACCOUNT", "time": "2022-08-08T13:46:16Z", "region": "ap-northeast-1", "resources": [ "arn:aws:s3:::pinpoint-stepfunctions-dev-s3" ], "detail": { "version": "0", "bucket": { "name": "pinpoint-stepfunctions-dev-s3" }, "object": { "key": "pinpoint_example_import_3.csv", "size": 11344, "etag": "6abfade02d8367a3fab8e0bf494aec4c", "sequencer": "0062F113A853D8F276" }, "request-id": "H38YC5YNWZXXP3N2", "requester": "YOUR_ACCOUNT", "source-ip-address": "8.37.43.7", "reason": "PutObject" } }
import datetime import logging import os import boto3 from botocore.exceptions import ClientError # Lambda environment variable application_id = os.environ["APPLICATION_ID"] pinpoinr_role = os.environ["PINPOINT_ROLE"] # Data setting t_delta = datetime.timedelta(hours=9) JST = datetime.timezone(t_delta, "JST") now = datetime.datetime.now(JST) date = now.strftime("%m%d") # Log setting logger = logging.getLogger(__name__) client = boto3.client("pinpoint") def lambda_handler(event, context): # Extracted S3 file path bucket_name = event["detail"]["bucket"]["name"] object_key = event["detail"]["object"]["key"] try: response = client.create_import_job( ApplicationId=application_id, ImportJobRequest={ "DefineSegment": True, "Format": "CSV", "RegisterEndpoints": True, "RoleArn": pinpoinr_role, "S3Url": "s3://{}/{}".format(bucket_name, object_key), "SegmentName": "{}_segment".format(date), }, ) except ClientError: logger.exception("Could not import segment") raise else: return response
Pinpoint - create_import_job — Boto3 Docs 1.24.46 documentation
以下の部分でS3のEventBridge通知からS3のファイルパス抽出を行っています。
# Extracted S3 file path bucket_name = event["detail"]["bucket"]["name"] object_key = event["detail"]["object"]["key"]
キャンペーン作成用ではセグメントインポート処理から連携されるセグメントIDの抽出と日付プレフィックスを付与したキャンペーンを作成しています。
import datetime import logging import os import boto3 from botocore.exceptions import ClientError # Lambda environment variable application_id = os.environ["APPLICATION_ID"] template = os.environ["TEMPLATE"] # Data setting t_delta = datetime.timedelta(hours=9) JST = datetime.timezone(t_delta, "JST") now = datetime.datetime.now(JST) date = now.strftime("%m%d") # Log setting logger = logging.getLogger(__name__) client = boto3.client("pinpoint") def lambda_handler(event, context): # Extracted Segment Id segmetn_id = event["ImportJobResponse"]["Definition"]["SegmentId"] try: response = client.create_campaign( ApplicationId=application_id, WriteCampaignRequest={ "Name": "{}_email_campaign".format(date), "Schedule": { "StartTime": "IMMEDIATE", }, "SegmentId": segmetn_id, "SegmentVersion": 1, "TemplateConfiguration": { "EmailTemplate": { "Name": template, }, }, }, ) except ClientError: logger.exception("Could not create campaign") raise else: return response
Pinpoint - create_campaign — Boto3 Docs 1.24.46 documentation
以下の部分でセグメントインポート処理からセグメントIDの抽出を行っています。
# Extracted Segment Id segmetn_id = event["ImportJobResponse"]["Definition"]["SegmentId"]
ここではEmail向けのキャンペーンを作成していますが、テンプレートの指定を書き換えることでSMS配信を指定することができます。以下の例ではSMS向けの設定をしています。
"TemplateConfiguration": { "SMSTemplate": { "Name": template, },
Step Functions
ワークフローの全体イメージはこちらです。
Step Functions定義
{ "StartAt": "import-segment", "States": { "import-segment": { "Next": "parallel", "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "Type": "Task", "OutputPath": "$.Payload", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "arn:aws:lambda:ap-northeast-1:YOUR_AWSACCOUNT_ID:function:pinpoint-stepfunctions-dev-import-segment-function", "Payload.$": "$" } }, "parallel": { "Type": "Parallel", "End": true, "Branches": [ { "StartAt": "create-email-campagin", "States": { "create-email-campagin": { "End": true, "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 5, "BackoffRate": 2 } ], "Type": "Task", "OutputPath": "$.Payload", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "arn:aws:lambda:ap-northeast-1:YOUR_AWSACCOUNT_ID:function:pinpoint-stepfunctions-dev-create-email-campagin-fucntion", "Payload.$": "$" } } } }, { "StartAt": "create-sms-campagin", "States": { "create-sms-campagin": { "End": true, "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 5, "BackoffRate": 2 } ], "Type": "Task", "OutputPath": "$.Payload", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "arn:aws:lambda:ap-northeast-1:YOUR_AWSACCOUNT_ID:function:pinpoint-stepfunctions-dev-create-sms-campagin-fucntion", "Payload.$": "$" } } } } ] } } }
インポート処理ではLambda Invoke
で先程作成したセグメントインポート用のLambdaファンクションを指定します。
出力はフィルタリングして$.Payload
を抽出しています。
次にEmail/SMS向けのキャンペーン作成を並列実行するために、Parallel state
を設定していきます。
最後に、キャンペーン作成用のLambdaファンクションをParallel state配下に設定していきます。
ここではEmailとSMS向けのキャンペーン作成を並列実行しています。
前段のセグメントインポート処理でインポート量が多い場合はLambda実行完了後もPinpoint側でインポート処理が続いていることがあるので、エラー処理で再試行処理を追加していきます。
リトライ時の待機時間の考え方についてはこちらのブログをご参考にしてください。
S3
Step Functionsをトリガーするために、CSVをアップロードするS3バケットのEventBridgeの通知設定を有効にします。
EventBridge
S3にファイルが置かれた場合にStep FunctionsをトリガーするためのEventBridgeを設定します。
イベントパターンにはオブジェクト作成時にトリガーされるよう設定します。
{ "detail-type": ["Object Created"], "source": ["aws.s3"], "detail": { "bucket": { "name": ["YOUR_BUCKET_NAME"] } } }
イベントのターゲットにはさきほど作成したStep Functionsを指定します。
実行
サンプルのセグメント情報を記載したCSVファイルをS3にアップロードします。
ChannelType,Address,User.UserAttributes.Name SMS,+8180xxxxxxxx,たかやまSMS EMAIL,[email protected],たかやまメール1 EMAIL,[email protected],たかやまメール2 EMAIL,[email protected],たかやまメール3
ファイルがアップロードされるとStep Functionsが実行されることが確認できます。
Pinpoint側にも日付プレフィックスのついたセグメントとキャンペーンが作成されていることが確認できます。
メールとSMSの配信も無事確認できました。
Appendix : Pinpoint APIでの実装例
今回Pinpointの作成リソースに日付プレフィックスをつけるためにLambdaを利用しましたが、リソース名が固定値またはアップロードのCSVファイル名を付与するような場合はStep Functionsで直接Pinpoint APIを定義する方法も可能です。
Step Functions定義
{ "StartAt": "import-segment", "States": { "import-segment": { "Next": "parallel", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:pinpoint:createImportJob", "Parameters": { "ApplicationId": "YOUR_PINPOINT_APPLICATION_ID", "ImportJobRequest": { "DefineSegment": true, "Format": "CSV", "RegisterEndpoints": true, "RoleArn": "arn:aws:iam::YOUR_AWSACCOUNT_ID:role/service-role/pinpoint-events", "S3Url.$": "States.Format('s3://{}/{}', $.detail.bucket.name,$.detail.object.key)", "SegmentName.$": "States.Format('segment-{}', $.detail.object.key)" } } }, "parallel": { "Type": "Parallel", "End": true, "Branches": [ { "StartAt": "create-campaign-for-email-pinpointapi", "States": { "create-campaign-for-email-pinpointapi": { "End": true, "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 5, "BackoffRate": 2 } ], "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:pinpoint:createCampaign", "Parameters": { "ApplicationId": "YOUR_PINPOINT_APPLICATION_ID", "WriteCampaignRequest": { "Name": "campaign", "Schedule": { "StartTime": "IMMEDIATE" }, "SegmentId.$": "$.ImportJobResponse.Definition.SegmentId", "SegmentVersion": 1, "TemplateConfiguration": { "EmailTemplate": { "Name": "test-template" } } } } } } }, { "StartAt": "create-campaign-for-sms-pinpointapi", "States": { "create-campaign-for-sms-pinpointapi": { "End": true, "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 5, "BackoffRate": 2 } ], "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:pinpoint:createCampaign", "Parameters": { "ApplicationId": "YOUR_PINPOINT_APPLICATION_ID", "WriteCampaignRequest": { "Name": "campaign", "Schedule": { "StartTime": "IMMEDIATE" }, "SegmentId.$": "$.ImportJobResponse.Definition.SegmentId", "SegmentVersion": 1, "TemplateConfiguration": { "SmsTemplate": { "Name": "test-template" } } } } } } } ] } } }
最後に
Step Functionsを利用したキャンペーン自動配信の仕組みを作成しました。
今回の例では、セグメントファイルアップデート後に即座にキャンペーン配信されるような流れになりますが、キャンペーン作成時に配信時間の指定もできるのでユースケースに合わせ設定いただければと思います。
以上、たかやまでした。